package cn.doitedu.rtdw.rt_report;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/3
 * @Desc: 学大数据，到多易教育
 *
 * 页面 page006,page007,page010: 当天累计到此刻的pv量，uv量（每10分钟更新一次）
 *
 **/
public class PromotionRealTimeReport01 {

    public static void main(String[] args) {
        // 构建编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
        env.setParallelism(1);

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 读取dwd层的明细宽表数据（创建kafka逻辑映射表）
        tEnv.executeSql(
         " CREATE TABLE mall_events_commondim_kfksource(          "
                 + "     user_id           INT,                            "
                 + "     username          string,                         "
                 + "     session_id        string,                         "
                 + "     event_id          string,                         "
                 + "     event_time        bigint,                         "
                 + "     lat               double,                         "
                 + "     lng               double,                         "
                 + "     release_channel   string,                         "
                 + "     device_type       string,                         "
                 + "     properties        map<string,string>,             "
                 + "     register_phone    STRING,                         "
                 + "     user_status       INT,                            "
                 + "     register_time     TIMESTAMP(3),                   "
                 + "     register_gender   INT,                            "
                 + "     register_birthday DATE, register_province STRING, "
                 + "     register_city STRING, register_job STRING, register_source_type INT,   "
                 + "     gps_province   STRING, gps_city STRING, gps_region STRING,             "
                 + "     page_type   STRING, page_service STRING,         "
                 + "     rt as to_timestamp_ltz(event_time,3) ,           "
                 + "     watermark for  rt as rt - interval '0' seconds   "
                 + " ) WITH (                                             "
                 + "  'connector' = 'kafka',                              "
                 + "  'topic' = 'mall-events-wide',                     "
                 + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                 + "  'properties.group.id' = 'testGroup',                "
                 + "  'scan.startup.mode' = 'latest-offset',            "
                 + "  'value.format'='json',                              "
                 + "  'value.json.fail-on-missing-field'='false',         "
                 + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        // 创建输出结果目标mysql表的 逻辑映射表
        tEnv.executeSql(
                " CREATE TABLE promotion_rtrpt_01_sink (            "
                        +"   window_start timestamp(3),                      "
                        +"   window_end   timestamp(3),                      "
                        +"   page_url STRING,                                "
                        +"   pv   BIGINT,                                    "
                        +"   uv   BIGINT,                                    "
                        +"   PRIMARY KEY (page_url) NOT ENFORCED             "
                        +" ) WITH (                                          "
                        +"    'connector' = 'jdbc',                          "
                        +"    'url' = 'jdbc:mysql://doitedu:3306/realtimedw',"
                        +"    'table-name' = 'promotion_rtrpt_01',           "
                        +"    'username' = 'root',                           "
                        +"    'password' = 'root'                            "
                        +" )                                                 "
        );

        // 统计sql,并插入
        tEnv.executeSql(
                " INSERT INTO promotion_rtrpt_01_sink                                             "
                        +" with tmp AS (                                                                   "
                        +"   SELECT                                                                        "
                        +"     user_id,                                                                    "
                        +"     properties['url'] as page_url,                                                   "
                        +"     rt                                                                          "
                        +"   FROM mall_events_commondim_kfksource                                          "
                        +"   WHERE event_id = 'page_load'                                                  "
                        +"     AND properties['url'] in ('page006' , 'page007' , 'page010')                "
                        +" )                                                                               "
                        +"                                                                                 "
                        +" SELECT                                                                          "
                        +"   window_start,                                                                 "
                        +"   window_end,                                                                   "
                        +"   page_url,                                                                     "
                        +"   count(1) as pv,                                                               "
                        +"   count(distinct user_id) as uv                                                 "
                        +" FROM TABLE(                                                                     "
                        +"  CUMULATE(TABLE tmp, DESCRIPTOR(rt),INTERVAL '10' MINUTES, INTERVAL '24' HOURS) "
                        +" )                                                                               "
                        +" GROUP BY                                                                        "
                        +"   window_start,                                                                 "
                        +"   window_end,                                                                   "
                        +"   page_url                                                                      "
        );


    }
}
